Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver/rangefeed: introduce StreamManager #134958

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

wenyihu6
Copy link
Contributor

@wenyihu6 wenyihu6 commented Nov 12, 2024

This patch introduces StreamManager, which should be created and persisted for
the lifetime of each node.MuxRangefeed call. It will handle starting and
stopping the underlying wrapped sender, as well as managing individual
streams/rangefeeds. While currently unused, future commits will refactor
the unbuffered sender to use it.

Part of: #110432
Release note: none

Co-authored-by: Steven Danna [email protected]

@cockroach-teamcity
Copy link
Member

This change is Reviewable

Copy link

blathers-crl bot commented Nov 13, 2024

It looks like your PR touches production code but doesn't add or edit any test code. Did you consider adding tests to your PR?

🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf.

@wenyihu6 wenyihu6 force-pushed the refactorfirstcommit6 branch 6 times, most recently from 7a5bdda to 39afe4e Compare November 13, 2024 14:24
@wenyihu6 wenyihu6 marked this pull request as ready for review November 13, 2024 14:44
@wenyihu6 wenyihu6 requested review from a team as code owners November 13, 2024 14:44
@wenyihu6
Copy link
Contributor Author

I added more tests for StreamManager in #135075. I found it a bit challenging to test StreamManager without a real UnbufferedSender implementation. Creating a mock sender started to feel like reinventing the wheel of an UnbufferedSender. That said, we can add more simple tests for StreamManager without relying on actual senders. Wdyt?

@wenyihu6
Copy link
Contributor Author

re: https://reviewable.io/reviews/cockroachdb/cockroach/125872#-OBPhJgK60IsNNQ1MGIz

This is a good point, and we have also debated the design here quite a bit. If we want to ensure stream is always added before errors occur, we could either: 1) pass the stream manager as part of the rangefeed.Stream interface to p.Register, or 2) pass it as a separate parameter in stores.Rangefeed. We didn't quite love either approach - changing the Stream interface would require updating all test implementations and make the interface contract less minimal while adding an extra StreamManager parameter to stores.Rangefeed didn't feel right either. We ended up deciding to return the Disconnector to node.MuxRangefeed. The downside is that errors might occur before the stream is added to the StreamManager. We’re addressing this by checking if the stream has been disconnected before adding it and ignoring if it has.

if d.IsDisconnected() {
// If the stream is already disconnected, we don't add it to streams. The
// registration will have already sent an error to the client.
return
. I tried covering this case in a test
t.Run("Disconnect stream after registration with processor but before adding to stream manager",
func(t *testing.T) {
p, h, stopper := newTestProcessor(t, withRangefeedTestType(rt))
defer stopper.Stop(ctx)
stream := sm.NewStream(sID, rID)
registered, d, _ := p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */
false /* withDiff */, false /* withFiltering */, false, /* withOmitRemote */
stream, func() {})
require.True(t, registered)
go p.StopWithErr(disconnectErr)
require.Equal(t, 0, testRangefeedCounter.get())
sm.AddStream(sID, d)
expectErrorHandlingInvariance(p)
testServerStream.reset()
})
.

Copy link
Member

@tbg tbg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 4 of 4 files at r1, 2 of 2 files at r2, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @stevendanna and @wenyihu6)


pkg/kv/kvserver/rangefeed/stream_manager.go line 76 at r2 (raw file):

	// run is the main loop for the sender. It is expected to run in the
	// background until a node level error is encountered which would shut down
	// all streams in StreamManager.

speaking from the point of view of this being an interface, is run() supposed to launch a goroutine and then return? And should the lifetime of that goroutine be governed by the lifetime of ctx? Or is it supposed to die with cleanup?

Does run even need to be in the interface? IMO it's a bit of an oddity to have a "launcher" method. Could whatever gives us the sender (NewUnbufferedSender() or whatever) kick it off instead?

A similar comment could be made for cleanup. Can the sender clean itself up when it is tearing down its run? In other words, can we start and stop the sender before making the surrounding StreamManager?

Btw I'm happy for all of this to be revisited down the road. These are not blocking comments.


pkg/kv/kvserver/rangefeed/stream_manager.go line 83 at r2 (raw file):

	// sendIsThreadSafe is a no-op declaration method. It is a contract that
	// sendUnbuffered and sendBuffered should be thread-safe.
	sendIsThreadSafe()

I've never seen a marker method like this before. Does this add anything over adding comments to send{UnB,unb}uffered?


pkg/kv/kvserver/rangefeed/stream_manager.go line 113 at r2 (raw file):

}

// DisconnectStream disconnects the stream with the given streamID.

Can you add some words here that explain who calls this (the mux, right?) and why? And why we are leaving the stream in the map. Is the idea that when we call Disconnect the registration will bubble up an error and we get a subsequent call to OnError which then will remove the stream?


pkg/kv/kvserver/rangefeed/stream_manager.go line 193 at r2 (raw file):

		sm.metrics.UpdateMetricsOnRangefeedDisconnect()
	}
	sm.streams.m = make(map[int64]Disconnector)

I assume you're worried about someone calling AddStream after Stop? Can we make it a part of the contract that this does not happen?


pkg/kv/kvserver/rangefeed/stream_manager.go line 198 at r2 (raw file):

// Error returns a channel that can be used to receive errors from sender.run.
// Only non-nil errors are sent on this channel.
func (sm *StreamManager) Error() chan error {

I think you want to return <-chan error (caller can't write to it).
Also, I think only one error is ever received, and only on true error (nothing may ever be sent on this channel). It helps to be specific about the semantics.

@stevendanna stevendanna requested a review from tbg November 15, 2024 17:13
Copy link
Collaborator

@stevendanna stevendanna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @tbg and @wenyihu6)


pkg/kv/kvserver/rangefeed/stream_manager.go line 76 at r2 (raw file):

Previously, tbg (Tobias Grieger) wrote…

speaking from the point of view of this being an interface, is run() supposed to launch a goroutine and then return? And should the lifetime of that goroutine be governed by the lifetime of ctx? Or is it supposed to die with cleanup?

Does run even need to be in the interface? IMO it's a bit of an oddity to have a "launcher" method. Could whatever gives us the sender (NewUnbufferedSender() or whatever) kick it off instead?

A similar comment could be made for cleanup. Can the sender clean itself up when it is tearing down its run? In other words, can we start and stop the sender before making the surrounding StreamManager?

Btw I'm happy for all of this to be revisited down the road. These are not blocking comments.

👍 Let's definitely follow up here.


pkg/kv/kvserver/rangefeed/stream_manager.go line 83 at r2 (raw file):

Previously, tbg (Tobias Grieger) wrote…

I've never seen a marker method like this before. Does this add anything over adding comments to send{UnB,unb}uffered?

I think this has gotten copy/pasta'd around between a few implementations. I suppose one benefit is that when making a new implementation, you have to type out this method name and perhaps then you think about whether your Send method is in fact thread safe.


pkg/kv/kvserver/rangefeed/stream_manager.go line 113 at r2 (raw file):

And why we are leaving the stream in the map. Is the idea that when we call Disconnect the registration will bubble up an error and we get a subsequent call to OnError which then will remove the stream?

Yes. We could remove it here as well without much harm if we wanted some defense in depth against the registration somehow never sending us the event.


pkg/kv/kvserver/rangefeed/stream_manager.go line 143 at r2 (raw file):

	}
	if _, ok := sm.streams.m[streamID]; ok {
		log.Fatalf(context.Background(), "stream %d already exists", streamID)

I kinda wonder if some of these are really halt-the-process level bugs vs having this return an error and changing this to an AssertionFailedf.


pkg/kv/kvserver/rangefeed/stream_manager.go line 163 at r2 (raw file):

	sm.wg.Add(1)
	ctx, sm.taskCancel = context.WithCancel(ctx)
	if err := stopper.RunAsyncTask(ctx, "buffered stream output", func(ctx context.Context) {

s/buffered stream out/stream manager SOMETHING/

But, this goes to Tobi's point that perhaps the sender should be responsible for the actual launching of this task.


pkg/kv/kvserver/rangefeed/stream_manager.go line 190 at r2 (raw file):

		// terminate.
		disconnector.Disconnect(
			kvpb.NewError(kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED)))

One would hope the compiler would do this for you, but we could move this error construction out of the loop.


pkg/kv/kvserver/rangefeed/stream_manager.go line 191 at r2 (raw file):

		disconnector.Disconnect(
			kvpb.NewError(kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED)))
		sm.metrics.UpdateMetricsOnRangefeedDisconnect()

If we gave this an argument, we could call it once with len(sm.streams.m)

@wenyihu6 wenyihu6 force-pushed the refactorfirstcommit6 branch 3 times, most recently from d9968ef to dfae2eb Compare November 15, 2024 21:09
This patch updates RangefeedMetricsRecorder to include
UpdateMetricsOnRangefeedDisconnectBy, which decrements rangefeed metrics by the
given argument. While the benefits remain unclear, future commits will use
this method to clean up metrics during multiple rangefeed disconnects.

Epic: none
Release note: none
This patch introduces StreamManager, which should be created and persisted for
the lifetime of each node.MuxRangefeed call. It will handle starting and
stopping the underlying wrapped sender, as well as managing individual
streams/rangefeeds. While currently unused, future commits will refactor
the unbuffered sender to use it.

Part of: cockroachdb#110432
Release note: none

Co-authored-by: Steven Danna [email protected]
Copy link
Contributor Author

@wenyihu6 wenyihu6 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @stevendanna and @tbg)


pkg/kv/kvserver/rangefeed/stream_manager.go line 76 at r2 (raw file):

Previously, stevendanna (Steven Danna) wrote…

👍 Let's definitely follow up here.

Agreed, I recall Steven has mentioned something similar before. Definitely looks like something we should revisit - filed an issue to track these targeted follow ups #135332.


pkg/kv/kvserver/rangefeed/stream_manager.go line 83 at r2 (raw file):

Previously, stevendanna (Steven Danna) wrote…

I think this has gotten copy/pasta'd around between a few implementations. I suppose one benefit is that when making a new implementation, you have to type out this method name and perhaps then you think about whether your Send method is in fact thread safe.

I added them when I was reading this review https://reviewable.io/reviews/cockroachdb/cockroach/125872#-OBPgnLn2srEopeTC53V and thought this method would give us a nice comment/place to discuss thread-safety for the implementations. I've removed this from the interface and added two comments for the send{UnB,B} instead.


pkg/kv/kvserver/rangefeed/stream_manager.go line 113 at r2 (raw file):

Previously, stevendanna (Steven Danna) wrote…

And why we are leaving the stream in the map. Is the idea that when we call Disconnect the registration will bubble up an error and we get a subsequent call to OnError which then will remove the stream?

Yes. We could remove it here as well without much harm if we wanted some defense in depth against the registration somehow never sending us the event.

+1, we're relying on the r.Disconnect method to handle de-dup and ensure this is idempotent. It checks whether r.mu.disconnected is set.


pkg/kv/kvserver/rangefeed/stream_manager.go line 143 at r2 (raw file):

Previously, stevendanna (Steven Danna) wrote…

I kinda wonder if some of these are really halt-the-process level bugs vs having this return an error and changing this to an AssertionFailedf.

Agreed, we have a few log.Fatal here which may worth a second though - tracked here #135332 as well.


pkg/kv/kvserver/rangefeed/stream_manager.go line 163 at r2 (raw file):

Previously, stevendanna (Steven Danna) wrote…

s/buffered stream out/stream manager SOMETHING/

But, this goes to Tobi's point that perhaps the sender should be responsible for the actual launching of this task.

Renamed to stream-manager-sender. I will address the second part of the comment here #135332.


pkg/kv/kvserver/rangefeed/stream_manager.go line 190 at r2 (raw file):

Previously, stevendanna (Steven Danna) wrote…

One would hope the compiler would do this for you, but we could move this error construction out of the loop.

Good point, done.


pkg/kv/kvserver/rangefeed/stream_manager.go line 191 at r2 (raw file):

Previously, stevendanna (Steven Danna) wrote…

If we gave this an argument, we could call it once with len(sm.streams.m)

Done.


pkg/kv/kvserver/rangefeed/stream_manager.go line 193 at r2 (raw file):

Previously, tbg (Tobias Grieger) wrote…

I assume you're worried about someone calling AddStream after Stop? Can we make it a part of the contract that this does not happen?

It should be impossible to call AddStream or DisconnectStream after Stop, as this could lead to incomplete cleanup of metrics and rangefeeds. Stop is only called during the defer of node.MuxRangefeed. Once Stop is called, no additional calls to AddStream or DisconnectStream should occur. https://github.com/wenyihu6/cockroach/blob/6db61edc4668f63a396ff1e9e7cd001e20b13c28/pkg/server/node.go#L2164 I only cleaned up the map because it seemed like the right thing to do, ensuring the map is cleaned up after the rangefeeds have been disconnected.


pkg/kv/kvserver/rangefeed/stream_manager.go line 198 at r2 (raw file):

Previously, tbg (Tobias Grieger) wrote…

I think you want to return <-chan error (caller can't write to it).
Also, I think only one error is ever received, and only on true error (nothing may ever be sent on this channel). It helps to be specific about the semantics.

Clarified the semantics. How does the comment look now?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants